use std::{
    collections::LinkedList,
    sync::{
        atomic::{AtomicBool, AtomicI64, AtomicU16},
        Arc, Mutex,
    },
};

use async_trait::async_trait;
use common::{base::id_generator, utils::time_utils};
use tracing::info;

use crate::net::tcp_net::TCPNet;

use super::{
    airport::{self, Airport},
    channel::Channel,
};

pub struct TCPChannel {
    airport: Arc<Box<Airport>>,
    //
    tcp_net: Arc<Box<TCPNet>>,
    //  等待返回 ack
    channel_name: String,
    // 等待发送Ack的消息列表
    wait_ack_id_list: Mutex<LinkedList<i64>>,
    // 主动连接还是被动连接
    is_active: bool,
    // 全局的连接信息
    conn_id: AtomicI64,
    // 通道名字
    channel_id: i64,
    // 注册时候的时间戳
    register_time: i64,

    // 通道端口号
    channel_port: AtomicU16,
    //
    is_set_port: AtomicBool,
}

impl TCPChannel {
    pub fn new(
        airport: Arc<Box<Airport>>,
        conn_id: i64,
        channel_id: i64,
        is_active: bool,
    ) -> Box<Self> {
        let tcp_net = airport.xprc_net.clone();
        Box::new(TCPChannel {
            airport: airport,
            tcp_net: tcp_net,
            channel_name: String::default(),
            is_active: is_active,
            wait_ack_id_list: Mutex::new(LinkedList::new()),
            conn_id: AtomicI64::new(conn_id),
            channel_id: channel_id,
            register_time: time_utils::cur_timestamp(),
            channel_port: AtomicU16::new(0),
            is_set_port: AtomicBool::new(false),
        })
    }
}

#[async_trait]
impl Channel for TCPChannel {
    fn init(&self) {}

    fn start(&self) {}

    fn finialize(&self) {}

    async fn on_close(&self) {
        let conn_id = self.conn_id.load(std::sync::atomic::Ordering::Relaxed);

        let conn_info = if conn_id != 0 {
            self.airport
                .get_dxc_manager()
                .remove_channel_id_in_remote_dxc(conn_id, self.channel_id)
                .await;
            let channel_port = self.channel_port.load(std::sync::atomic::Ordering::Relaxed);

            id_generator::replace_identity_port(conn_id, channel_port)
        } else {
            0
        };

        if conn_info != 0 {
            info!(
                "关闭通道 ip:port = {}",
                id_generator::conn_id_to_addr(conn_info)
            );
        } else {
            info!("关闭通道 id = {}", self.channel_id);
        }
    }

    async fn send_message(&self, message: &Vec<u8>) -> bool {
        self.tcp_net.send_message(self.channel_id, message).await
    }

    async fn close_channel(&self) {
        self.tcp_net.close_session(self.channel_id).await;
    }

    async fn message_in(&self, message: Vec<u8>) {
        self.airport.message_in(self.channel_id, message).await;
    }

    fn is_login(&self) -> bool {
        true
    }

    fn is_active(&self) -> bool {
        self.is_active
    }

    async fn set_channel_port(&self, port: u16) {
        let conn_id = self.tcp_net.get_session_conn_id(self.channel_id).await;

        if conn_id.is_none() {
            // 通道不存在
            return;
        }
        let conn_id = conn_id.unwrap();

        info!(
            "设置channel {} 的端口号 port:{}  conn_id = {}",
            self.channel_id, port, conn_id
        );

        self.channel_port
            .store(port, std::sync::atomic::Ordering::Relaxed);

        let conn_id = id_generator::replace_identity_port(conn_id, port);
        //
        self.is_set_port
            .store(true, std::sync::atomic::Ordering::Relaxed);

        self.conn_id
            .store(conn_id, std::sync::atomic::Ordering::Relaxed);

        self.airport
            .get_channel_manager()
            .add_conn_id_to_channel_map(conn_id, self.channel_id)
            .await;

        self.airport
            .get_dxc_manager()
            .add_channel_id_to_remote_dxc(conn_id, self.channel_id)
            .await;
    }

    fn id(&self) -> i64 {
        self.channel_id
    }

    fn register_time(&self) -> i64 {
        self.register_time
    }

    fn conn_id(&self) -> i64 {
        self.conn_id.load(std::sync::atomic::Ordering::Relaxed)
    }

    fn had_set_port(&self) -> bool {
        self.is_set_port.load(std::sync::atomic::Ordering::SeqCst)
    }

    fn add_normal_msg_and_wait_ack(&self, msg_id: i64) {
        let mut wait_ack_id_list = self.wait_ack_id_list.lock().unwrap();

        wait_ack_id_list.push_back(msg_id);
    }
    /**
     * 获取和情况
     */
    fn get_all_wait_for_send_msg_and_clear(&self) -> Vec<i64> {
        let mut wait_ack_id_list = self.wait_ack_id_list.lock().unwrap();

        let message_ids = wait_ack_id_list.iter().cloned().collect();

        wait_ack_id_list.clear();

        message_ids
    }
}
